package org.nbict.iot.task.realtime.func;

import com.alibaba.fastjson.JSONObject;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

/**
 * Created by songseven on 18/6/25.
 */
public class PacketResolver extends BaseFunction {

    @Override
    public void execute(TridentTuple tuple, TridentCollector
            collector) {
        JSONObject header = (JSONObject) tuple.get(1); //header
        JSONObject body = (JSONObject) tuple.get(2); //body

        String car_vin = header.getString("car_vin");
        Integer action = header.getInteger("header_cmd"); //debug
        Long timestamp = body.getLong("timestamp");

        Values values = new Values();
        values.add(car_vin);
        values.add(action);
        values.add(timestamp);
        JSONObject upload = (JSONObject) body.get("upload");
        //if (upload != null) {
            values.add(upload);
        //}
        collector.emit(values);
    }
}
